Skip to content

Conversation

@oerling
Copy link
Contributor

@oerling oerling commented Oct 30, 2025

ToGraph builds derived tables bottom up. On the returning edge of recursion we add joins or dt postprocessing steps like group by or limit. When there are things that do not fit the implied processing order of a dt, i.e. joins, group by, having, orderby, limit/ofset, we wrap the plan so far in another dt. For example,

scan, limit, filter, limit

would have (scan, limit) in a dt and the filter and second limit in a dt containing the first one. Now, when a dt as above is to the right of a join, we must start the dt to the right from scratch, meaning that the tables in the dt must be empty and not contain tables from the left side. On the other hand, for a right side that does not introduce a dt break, we must add the tables on the right to the dt where the left side was added.

To this effect we set and check allowedInDt correctly also for filters.

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Oct 30, 2025
@meta-codesync
Copy link

meta-codesync bot commented Oct 30, 2025

@oerling has imported this pull request. If you are a Meta employee, you can view this in D85860708.

}
}

TEST_F(PlanTest, outerJoinWithInnerJoin) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test pass on current main, is it expected?

Comment on lines +30 to +31
#define AXIOM_ASSERT_PLAN(plan, matcher) \
ASSERT_TRUE(matcher->match(plan)) << plan->toString(true, true);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this macros already defined in this file

Comment on lines 1863 to 2049

// For example on the right of left outer join, a filter must not go to the enclosing dt but must make its own dt.
if (!contains(allowedInDt, PlanType::kFilterNode)) {
return wrapInDt(node);
}

Copy link
Collaborator

@MBkkt MBkkt Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no test for this.
Even if we allow filter for right part of non-inner join, outerJoinWithInnerJoin test still will pass

@oerling oerling force-pushed the pushd-fix-pr branch 6 times, most recently from c585092 to 9454b48 Compare November 10, 2025 07:53
@oerling oerling force-pushed the pushd-fix-pr branch 3 times, most recently from 6281f34 to 77b66bb Compare November 16, 2025 19:14
ToGraph builds derived tables bottom up. On the returning edge of recursion we add joins or dt postprocessing steps like group by or limit. When there are things that do not fit the implied processing order of a dt, i.e. joins, group by, having, orderby, limit/ofset, we wrap the plan so far in another dt.
For example,

scan, limit, filter, limit

would have (scan, limit) in a dt and the filter and second limit in a dt containing the first one.
Now, when a dt as above is to the right of a join, we must start the dt to the right from scratch, meaning that the tables in the dt must be empty and not contain tables from the left side. On the other hand, for a right side that does not  introduce a dt break, we must add the tables on the right to the dt where the left side was added.

To this effect we set and check allowedInDt correctly also for filters.
mbasmanova added a commit to mbasmanova/verax that referenced this pull request Nov 21, 2025
Summary: Extracted from facebookincubator#573

Differential Revision: D87333918
mbasmanova added a commit to mbasmanova/verax that referenced this pull request Nov 21, 2025
Summary:

Extracted from facebookincubator#573

Co-authored-by: oerling

Differential Revision: D87333918
mbasmanova added a commit to mbasmanova/verax that referenced this pull request Nov 21, 2025
Summary:

Extracted from facebookincubator#573

Co-authored-by: oerling

Differential Revision: D87333918
mbasmanova added a commit to mbasmanova/verax that referenced this pull request Nov 25, 2025
Summary:
Fix cardinality estimate for aggregation. This fixes the plan for TPC-H q16.




Extracted from facebookincubator#573

Co-authored-by: oerling

Differential Revision: D87333918
mbasmanova added a commit to mbasmanova/verax that referenced this pull request Nov 25, 2025
Summary:

Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16.

Aggregation is planned as follows:

If 1 worker and 1 thread: all aggregations are planned as single aggregations.

A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread. 

Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation().

The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys. 

We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from. 

We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table.  If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity.  In this way, more keys produce more groups but will not overflow the table size.

If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for  a large group by (10 billion groups).

Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is  expectedNumDistincts(inputRowCount, maxGroups).

expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input.

We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1.

## Cost of partial aggregation

The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation.

We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction.

Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time.

We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row.

## Cost of final / single aggregation

If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input.

We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange.

The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes:

- Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys
- Lookup the hash in the hash table := Costs::hashTableCost(nOut)
- Compare the keys := kKeyCompareCost * numKeys
- Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes)
- Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost

The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value. 

The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples.

The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles.

Extracted from facebookincubator#573

Co-authored-by: oerling

Reviewed By: xiaoxmeng

Differential Revision: D87333918
mbasmanova added a commit to mbasmanova/verax that referenced this pull request Nov 25, 2025
Summary:

Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16.

Aggregation is planned as follows:

If 1 worker and 1 thread: all aggregations are planned as single aggregations.

A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread. 

Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation().

For use cases where statistics are not available, we provide an option to disable cost-based decisions and always split aggregation into partial + final: alwaysPlanPartialAggregation.

The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys. 

We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from. 

We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table.  If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity.  In this way, more keys produce more groups but will not overflow the table size.

If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for  a large group by (10 billion groups).

Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is  expectedNumDistincts(inputRowCount, maxGroups).

expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input.

We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1.

## Cost of partial aggregation

The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation.

We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction.

Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time.

We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row.

## Cost of final / single aggregation

If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input.

We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange.

The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes:

- Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys
- Lookup the hash in the hash table := Costs::hashTableCost(nOut)
- Compare the keys := kKeyCompareCost * numKeys
- Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes)
- Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost

The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value. 

The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples.

The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles.

Extracted from facebookincubator#573

Co-authored-by: oerling

Reviewed By: xiaoxmeng

Differential Revision: D87333918
mbasmanova added a commit to mbasmanova/verax that referenced this pull request Nov 25, 2025
Summary:

Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16.

Aggregation is planned as follows:

If 1 worker and 1 thread: all aggregations are planned as single aggregations.

A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread. 

Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation().

For use cases where statistics are not available, we provide an option to disable cost-based decisions and always split aggregation into partial + final: alwaysPlanPartialAggregation.

The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys. 

We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from. 

We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table.  If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity.  In this way, more keys produce more groups but will not overflow the table size.

If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for  a large group by (10 billion groups).

Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is  expectedNumDistincts(inputRowCount, maxGroups).

expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input.

We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1.

## Cost of partial aggregation

The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation.

We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction.

Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time.

We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row.

## Cost of final / single aggregation

If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input.

We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange.

The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes:

- Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys
- Lookup the hash in the hash table := Costs::hashTableCost(nOut)
- Compare the keys := kKeyCompareCost * numKeys
- Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes)
- Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost

The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value. 

The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples.

The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles.

Extracted from facebookincubator#573

Co-authored-by: oerling

Reviewed By: xiaoxmeng

Differential Revision: D87333918
meta-codesync bot pushed a commit that referenced this pull request Nov 25, 2025
Summary:
Pull Request resolved: #639

Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16.

Aggregation is planned as follows:

If 1 worker and 1 thread: all aggregations are planned as single aggregations.

A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread.

Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation().

For use cases where statistics are not available, we provide an option to disable cost-based decisions and always split aggregation into partial + final: alwaysPlanPartialAggregation.

The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys.

We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from.

We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table.  If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity.  In this way, more keys produce more groups but will not overflow the table size.

If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for  a large group by (10 billion groups).

Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is  expectedNumDistincts(inputRowCount, maxGroups).

expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input.

We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1.

## Cost of partial aggregation

The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation.

We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction.

Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time.

We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row.

## Cost of final / single aggregation

If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input.

We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange.

The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes:

- Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys
- Lookup the hash in the hash table := Costs::hashTableCost(nOut)
- Compare the keys := kKeyCompareCost * numKeys
- Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes)
- Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost

The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value.

The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples.

The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles.

Extracted from #573

Reviewed By: xiaoxmeng

Differential Revision: D87333918

fbshipit-source-id: 23ffab1ace0ff3332491d33d847fb4dec7cbe428

Co-authored-by: oerling
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants